package com.bfxy.paya.service.producer;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.tomcat.util.threads.ThreadPoolExecutor;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 用於封裝生產者消息，並發送
 * 主要是做一個分佈式事務消息的投遞（递）
 */
@Component
public class TransactionProducer implements InitializingBean {

    private TransactionMQProducer producer;//事物發送的時候要用到這個對象

    private ExecutorService executorService;//回調check檢查的時候需要一個線程池

    @Autowired
    private TransactionListenerImpl transactionListenerImpl;//回調函數，事物消息必須要有這個實現類（实现类）

    //nameServer，要連接的nameServer地址
    private static final String NAMESERVER = "192.168.40.201:9876;192.168.40.202:9876;192.168.40.203:9876;192.168.40.204:9876;";

    //生產者組
    private static final String PRODUCER_GROUP_NAME = "tx_pay_producer_group_name";

    //私有的構（构）造方法
    private TransactionProducer() {
        //新建一个Producer
        this.producer = new TransactionMQProducer(PRODUCER_GROUP_NAME);
        /**
         * 自定義线程池
         *
         * 1	corePoolSize	int	                     核心线程池大小
         * 2	maximumPoolSize	int	                     最大线程池大小
         * 3	keepAliveTime	long	                 线程最大空闲时间
         * 4	unit	        TimeUnit	             时间单位
         * 5	workQueue	    BlockingQueue<Runnable>	 线程等待队列
         * 6	threadFactory	ThreadFactory	         线程创建工厂
         * 7	handler	        RejectedExecutionHandler 拒绝策略
         */
        this.executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);//線程
                thread.setName(PRODUCER_GROUP_NAME + "-check-thread");
                return thread;
            }
        });
        this.producer.setExecutorService(executorService);//把當前的線程池設置進來
        this.producer.setNamesrvAddr(NAMESERVER);//nameServer的地址

        // 不要寫在這裡，可能有問題：不知道他們兩個誰先誰后注入:TransactionMQProducer，ExecutorService；第24,26行
        // 通過 implements InitializingBean 來解決
        //this.producer.setTransactionListener(transactionListenerImpl);
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        this.producer.setTransactionListener(transactionListenerImpl);
        start();//啟動
    }

    private void start() {
        try {
            this.producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    public void shutdown() {
        this.producer.shutdown();
    }

    /**
     * 發消息
     * @param message   數據
     * @param argument  回調的Object
     * @return
     */
    public TransactionSendResult sendMessage(Message message, Object argument) {
        TransactionSendResult sendResult = null;
        try {
            sendResult = this.producer.sendMessageInTransaction(message, argument);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return sendResult;
    }


}
